序言
我们都听过愚公移山的故事,对个人来讲,移走一座山需要耗费的时间无疑是十分漫长的。因此,愚公找亲朋好友帮忙一起挖山,最终感动了天帝,天帝叫部下施展了法力将太行、王屋山移走。
愚公不愚,知道将任务拆解,分而治之。
那么,在分布式任务中,我么又何须使用传统的定时任务框架去完整的执行一个任务呢?
现在,让我们使用起 Elastic Job 吧!
简介
Elastic-Job-Lite 定位为轻量级无中心化解决方案,使用 jar 包的形式提供最轻量级的分布式任务的协调服务,外部依赖仅 Zookeeper。
Elastic-Job 是一个分布式调度解决方案,由两个相互独立的子项目 Elastic-Job-Lite 和 Elastic-Job-Cloud 组成。
基本概念
分片
任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。
例如:有一个遍历数据库某张表的作业,现有 2 台服务器。
为了快速的执行作业,那么每台服务器应执行作业的 50% 。
为满足此需求,可将作业分成 2 片,每台服务器执行 1 片。
因此,作业遍历数据的逻辑应为:
- 服务器 A 遍历 ID 以奇数结尾的数据;
- 服务器 B 遍历 ID 以偶数结尾的数据
如果分成10片,则作业遍历数据的逻辑应为每片分到的分片项应为 ID % 10,因此:
- 服务器 A 被分配到分片项 0,1,2,3,4
- 服务器 B 被分配到分片项 5,6,7,8,9
直接的结果就是服务器 A 遍历 ID 以 0-4 结尾的数据;服务器 B 遍历 ID 以 5-9 结尾的数据。
分片项与业务处理解耦
Elastic-Job 并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与真实数据的对应关系。
为此,Elastic-Job 中使用个性化参数来处理这种关系。
个性化参数及适用场景
个性化参数,即 shardingItemParameter,可以和分片项匹配对应关系,用于将分片项的数字转换为更加可读的业务代码。
例如:按照地区水平拆分数据库,数据库 A 是北京的数据;数据库 B 是上海的数据;数据库 C 是广州的数据。
如果仅按照分片项配置,开发者需要了解:
- 0 表示北京
- 1 表示上海
- 2 表示广州
合理使用个性化参数可以让代码更可读,如果配置为 0 = 北京,1 =上海,2 = 广州,那么代码中直接使用北京,上海,广州的枚举值即可完成分片项和业务逻辑的对应关系。
简而言之,分片数字和个性化参数可以形成一个映射关心。
核心理念
分布式调度
Elastic-Job-Lite 并无作业调度中心节点,而是基于部署作业框架的程序在到达相应时间点时各自触发调度。
注册中心仅用于作业注册和监控信息存储。而主作业节点仅用于处理分片和清理等功能。
作业高可用
Elastic-Job-Lite 提供最安全的方式执行作业。
若将分片总数设置为 1,并使用多于 1 台的服务器执行作业,那么作业将会以 1 主 n 从的方式执行。
一旦执行作业的服务器崩溃,等待执行的服务器将会在下次作业启动时替补执行。
注:开启失效转移功能效果更好,可以保证在本次作业执行时崩溃,备机立即启动替补执行。
资源最大化
Elastic-Job-Lite 提供灵活的配置方式,可以最大限度的提高执行作业的吞吐量。
对于分片项的设置,应该大于服务器的数量,且最好是大于服务器倍数的数量,此时作业将会合理的利用分布式资源,动态的分配分片项。
举个栗子:3 台服务器,分片项为 10 片,则分片项分配结果为:
- 服务器 A = 0,1,2
- 服务器 B = 3,4,5
- 服务器 C = 6,7,8,9
若服务器 C 崩溃,则分片项分配结果为:
- 服务器 A = 0,1,2,3,4
- 服务器 B = 5,6,7,8,9
在不丢失分片项的情况下,最大限度的利用现有资源提高吞吐量。
快速入门
① 首先在pom.xml添加相关依赖:1
2
3
4
5<!-- 版本配置 -->
<properties>
<java.version>1.8</java.version>
<elastic.job.version>2.1.5</elastic.job.version>
</properties>
1 | <!--调度框架--> |
② 配置 ZooKeeper 的相关参数信息(如服务器地址、命名空间):1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16/**
* 调度框架 ZooKeeper 配置
*
* @author lovike
* @since 2020/6/26
*/
("'${elasticjob.zookeeper.host}'.length() > 0")
public class JobZookeeperRegistryCenterConfig {
(initMethod = "init")
public ZookeeperRegistryCenter zookeeperRegistryCenter(@Value("${elasticjob.zookeeper.host}") final String serverList,
@Value("${elasticjob.zookeeper.namespace}") final String namespace) {
return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
}
}
③ 创建简单作业(比如一个监控接口超时报警的作业):1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39/**
* @author lovike
* @description: 监控接口超时报警作业
* @since 2020-06-26
*/
4j
public class WatchApiTimeOutJob implements SimpleJob {
// 配置的允许超时的数量
("${elasticJob.apitimeout.rule.count}")
private int count;
// 指定时间范围
("${elasticJob.apitimeout.rule.scope}")
private int scope;
ApiTimeResultMapper apiTimeResultMapper;
public void execute(ShardingContext shardingContext) {
Integer item = shardingContext.getShardingItem();
Integer totalCount = shardingContext.getShardingTotalCount();
log.info("开始执行 elastic-job,任务名称:WatchApiTimeOut,总分片数:{},当前分片数:{}", totalCount, item);
LocalDateTime endTime = LocalDateTime.now();
// 配置时间范围:小时
LocalDateTime startTime = endTime.minusHours(scope);
Long startTimestamp = startTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
Long endTimestamp = endTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
// 从数据库获取一段时间内响应时间超过 1s(该时间与 sql 列有关)以上的接口总数,若其到达配置的某个值, 报警
// 一段时间内:(结束是当前时间,开始时间是当前时间减去可配置的时间)
int apiTimeoutCount = apiTimeResultMapper.selectApiTimeoutCount(new Date(startTimestamp), new Date(endTimestamp));
if (apiTimeoutCount >= count) {
log.info("报警拉,允许接口超时数量:{},当前接口超时数:{}", count, apiTimeoutCount);
}
log.info("执行结束 elastic-job,任务名称:WatchApiTimeOut,总分片数:{},当前分片数: {}", totalCount, item);
}
}
④ 配置作业的相关参数信息并启动作业:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47/**
* @author lovike
* @description: 监控接口超时的配置
* @since 2020-06-26
*/
public class WatchApiTimeOutConfig {
private ZookeeperRegistryCenter zookeeperRegistryCenter;
public WatchApiTimeOutJob watchApiTimeOutJob;
private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass,
final String cron,
final int shardingTotalCount,
final Boolean enabled) {
// 根据传入的参数构建 JobCoreConfiguration
JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder(
jobClass.getName(),
cron,
shardingTotalCount).failover(true).build();
// 根据 JobCoreConfiguration 构建 SimpleJobConfiguration
SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(
jobCoreConfiguration, jobClass.getCanonicalName());
return LiteJobConfiguration.newBuilder(simpleJobConfiguration).disabled(!enabled)
.monitorExecution(true).overwrite(true).build();
}
(initMethod = "init", name = "ApiTimeOutJobScheduler")
public JobScheduler simpleJobScheduler(@Value("${elasticJob.apitimeout.cron}") final String cron,
@Value("${elasticJob.apitimeout.shardingTotalCount}") final int shardingTotalCount,
@Value("${elasticJob.apitimeout.enabled}") final Boolean enabled) {
// 使用配置文件参数构建 LiteJobConfiguration
LiteJobConfiguration liteJobConfiguration = getLiteJobConfiguration(
watchApiTimeOutJob.getClass(),
cron,
shardingTotalCount,
enabled);
return new SpringJobScheduler(watchApiTimeOutJob, zookeeperRegistryCenter, liteJobConfiguration);
}
}
下面是application.properties的所有配置信息:1
2
3
4
5
6
7
8
9
10
11
12
13
14# elastic-job zookeeper address
elasticjob.zookeeper.host=xx.xx.xx.xx:2181
elasticjob.zookeeper.namespace=elasticjob-job-lite-apiwatch
# 定时器执行时间
elasticJob.apitimeout.cron= 0/10 * * * * ?
# 分片总数
elasticJob.apitimeout.shardingTotalCount=1
# 是否启动
elasticJob.apitimeout.enabled=true
# 允许接口数
elasticJob.apitimeout.rule.count=10
# 时间设置
elasticJob.apitimeout.rule.scope=24
参考
文章信息
| 时间 | 说明 |
|---|---|
| 2020-06-03 | 初版 |
| 2020-11-19 | 序言 |